By Shreyas Kaundinya
Useful Links
Meaning of at-least once, at-most once and exactly-once delivery
Architecture
Introduction
-
horizontally scalable messaging system
-
easy, faster data processing
-
imparts a uniform processing load, instance can be recreated easily and
moved -
load balancing is easy
-
easier API
-
works well with kafka streams
-
simple client lib for processing and analyzing data stored in kafka
-
fault tolerant local state ⇒ allows stateful operations such as windowed
joins and aggregations -
supports exactly once, even when there is failure
-
One record at a time processing || event time based windowing operations
-
Offers necessary stream processing primitives along with a high level
streams DSL (map, filter, join, aggregations) and low level processor API
Concepts
Processors in topology
Source Processor
- has no upstream processors
- produces an input stream to its topology from one or multiple Kafka topics
by consuming records from these topics and forwards them to down stream
processors
Sink Processor
- no downstream processors
- sends any received records from its upstream processors to a specified kafka
topic
Time
Event Time
- point in time when the event occurred
- creation time at source
Processing Time
- point in time when the event or data record happens to be processed by the
stream processing application - Processing time maybe ms,hours,days later than original event time
Ingestion Time
- point in time when the record is stored in the partition by the broker
Kafka Stream assigns a timestamp to every record via TimestampExtractor
interface
- these per record ts help with progress of stream and are leveraged by the
time dependent operations such as window operations
Whenever Kafka Stream writes records to Kafka, it will also assign timestamps to
these new records
Timestamps depend on context
- Output records are generated via processing some input records :
context.forward()
triggered in theprocess()
call, o/p record timestamps
are inherited from i/p record timestamps directly - Output records are generated via periodic functions such as
Punctuator.punctuate()
, the o/p record timestamp is defined as the
current internal time (obtained bycontext.timestamp()
of the stream task - For aggregation, the resulting timestamp will be the maximum timestamp of
all i/p record timestamp contributing to the result
For aggregations & joins, timestamps are computed by using the following rules
- Joins : have left and right input records, take
max(left.ts, right.ts)
- For stream-table joins, the o/p record is assigned the timestamp from the
stream record - For aggregations, Kafka Streams also computes the
max
timestamp over all
records, per key, either globally (for non-windowed) or per-window. - For stateless operations, the input record timestamp is passed through.
- For
flatMap
and siblings that emit multiple records, all output records
inherit the timestamp from the corresponding input record.
Streams & Tables
- Kafka Streams API provides an abstraction over streams & tables
- Allows for
- making application elastic
- fault tolerant stateful processing
- run interacting streaming queries
Stream as table
- view stream as a changelog of a table
- each stream record is a change in state of the table
- easy to aggregate
Table as a stream
- table can be considered as a snapshot, of the latest value for each key in a
stream - table is thus a stream in disguise, it can be easily turned into a real
stream by iterating over each key-value entry in the table
Use : KStream, KTable, GlobalKTable
Aggregations
- takes one input stream or table, and yields a new table by combining
multiple input records into a single output record - Kafka Streams DSL, an input stream of an aggregation can be KStream, or
KTable, but the output stream will always be a KTable. - This allows Kafka Streams to update value upon out of order arrival of
further records - KStream/KTable emits a new aggregation value whne out-of-order arrival
happens
Windowing
- group records that have the same key for stateful operations such as
aggregations or joins - Windows are tracked per record key
- Windowing operations are available in streams DSL
Grace Period
: controls how long Kafka Streams will wait out for a out of
order data record- If records appear outside the grace period, the record is discarded
Windowing Types :
States
- If states are not required : processing of a record is independent of other
records - If states are required : processing of a record might require the
information of other records - States in Kafka Streams is called
state stores
which can be used by stream
processing applications to store and query data - Each task embeds one or more state stores that can be queried
- Offers fault tolerance and automatic recovery for local state stores
Processing Guarantees
- Exactly Once
- At least Once
- At most Once
Stream Partitions and Tasks
Intro
- enables data locality, elasticity, scalability, high performance and fault
tolerance - Kafka Streams uses the concepts of partitions and tasks to introduce
parallelism
Sample Code Snippets
Just a simple Kafka Streams Class
- The Topology
- collects streams into a KStream
- foreach print value
1package org.example.streamconsumer;
2
3import org.apache.kafka.common.serialization.Serdes;
4import org.apache.kafka.streams.KafkaStreams;
5import org.apache.kafka.streams.StreamsBuilder;
6import org.apache.kafka.streams.StreamsConfig;
7import org.apache.kafka.streams.Topology;
8import org.apache.kafka.streams.kstream.KStream;
9import org.example.consumer.Consumer;
10import org.slf4j.Logger;
11import org.slf4j.LoggerFactory;
12
13import java.util.Properties;
14
15public class StreamConsumer {
16 private KafkaStreams kafkaStreams;
17 private static final Logger log = LoggerFactory.getLogger(Consumer.class);
18
19 public void run() {
20 // this starts the KafkaStreams
21 this.kafkaStreams.start();
22 }
23
24 public StreamConsumer(String topic) {
25 // Builder used to define the topology for Streams
26 StreamsBuilder streamsBuilder = new StreamsBuilder();
27
28 // Define the topology
29 KStream<String, String> listeningStream = streamsBuilder.stream(topic);
30
31 listeningStream.foreach(
32 (key, value) -> {
33 log.info("[StreamConsumer] Got message : {}", value);
34 }
35 );
36
37 // build the topology of given Streams Graph
38 Topology topology = streamsBuilder.build();
39
40 // Properties for the Streams Config
41 Properties props = new Properties();
42 props.put(StreamsConfig.APPLICATION_ID_CONFIG, "shreyas.demo.app");
43 props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093,localhost:9094,localhost:9095");
44 props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
45 props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
46
47 // Instantiate the Streams
48 this.kafkaStreams = new KafkaStreams(topology, props);
49 }
50}